package com.atguigu.flink.timer;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/15
 *

 需要保存的数据(状态)有:
    基于事件时间
   1.定时器的时间
   2.记录上一个传感器的水位
   3.提供一个标记，来控制定时器的创建。只有当前没有定时器时，才创建


    操作时，只使用一种key
 */
public class Demo3_TimerExec
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
            .<WaterSensor>forMonotonousTimestamps()
            .withTimestampAssigner( (e, ts) -> e.getTs());

        env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy)
            .keyBy(WaterSensor::getId)
            .process(new KeyedProcessFunction<String, WaterSensor, String>()
            {
                long time ;
                boolean ifNeedTimer = true;
                int lastVc ;

                @Override
                public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                    String key = ctx.getCurrentKey();
                    TimerService timerService = ctx.timerService();

                    if (ifNeedTimer){
                        //创建定时器
                        time =  ctx.timestamp() + 5000;
                        timerService.registerEventTimeTimer(time);
                        System.out.println(key +"指定了以下时间的定时器:"+time);
                        ifNeedTimer = false;
                        //记录当前传感器的水位
                        lastVc = value.getVc();
                    }else if (value.getVc() < lastVc){
                        //水位下降
                        timerService.deleteEventTimeTimer(time);
                        System.out.println(key +"删除了以下时间的定时器:"+time);
                        //一切重置归0
                        ifNeedTimer = true;
                        lastVc = 0;
                    }else {
                        //记录当前传感器的水位
                        lastVc = value.getVc();

                    }

                }

                //定时器到点执行的程序
                @Override
                public void onTimer(long timestamp,OnTimerContext ctx, Collector<String> out) throws Exception {
                    out.collect(ctx.getCurrentKey() +" 到了"+timestamp+"点,5s内水位连续上升,预警......");
                    //一切重置归0
                    ifNeedTimer = true;
                    lastVc = 0;
                }



            })
            .print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
